package com.sohu2;

import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MultiBrokerProducer {
	private static Producer<Integer, String> producer;

	private final Properties props = new Properties();

	public MultiBrokerProducer() {
		props.put("metadata.broker.list", "192.168.200.252:9092,192.168.200.252:9093,192.168.200.252:9094");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		props.put("partitioner.class", "com.sohu2.PartitionerDemo");
		props.put("request.required.acks", "1");

		ProducerConfig config = new ProducerConfig(props);
		producer = new Producer<Integer, String>(config);
	}

	public static void main(String[] args) {
		MultiBrokerProducer sp = new MultiBrokerProducer();
		Random rnd = new Random();
		String topic = (String) args[0];
		for (long messCount = 0; messCount < 10; messCount++) {
			Integer key = rnd.nextInt(255);
			String msg = "This message is for key - " + key;
			System.out.println("this message is : " + msg);
			KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, msg);
			producer.send(data1);
		}
		producer.close();
	}

	// Once your multibroker cluster is up, create a topic with five partitions
	// and set the
	// replication factor as 2 before running this program using the following
	// command:
	// [root@localhost kafka-0.8]# bin/kafka-topics.sh --zookeeper
	// localhost:2181 --create --topic kafkatopic --partitions 5
	// --replicationfactor 2

	// [root@localhost kafka-0.8]# java MultiBrokerProducer kafkatopic
	// For verifying the data that is getting published to the Kafka broker, the
	// Kafka console
	// consumer can be used as follows:
	// [root@localhost kafka-0.8]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
}